Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream): support set nats consumer deliver policy as latest, earliest, by sequence, by timestamp #12176

Merged
merged 5 commits into from
Sep 14, 2023

Conversation

yufansong
Copy link
Member

@yufansong yufansong commented Sep 8, 2023

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

This PR try to add several deliver policies (latest, earliest, by sequence, by timestamp).

Test the policy under

  1. normal source
  2. recover the data after CN break down. And also can correctly consume data

We decide to merge it into the #12227 firstly. Will use that branch to build a image for user.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

Add the latest, earliest, by timestamp policy. Currently decide not support by sequence policy

default is earliest

CREATE TABLE test
(
  id integer,
  name varchar,
)
WITH (
  connector='nats',
    server_url='0.0.0.0:4222',
    subject='event1',
) FORMAT PLAIN ENCODE JSON;

latest policy, need to set scan.startup.mode='latest'

CREATE TABLE test_latest
(
  id integer,
  name varchar,
)
WITH (
  connector='nats',
    server_url='0.0.0.0:4222',
    subject='event1',
    scan.startup.mode='latest',
) FORMAT PLAIN ENCODE JSON;

earliest policy, need to set scan.startup.mode='earliest',

CREATE TABLE test_earliest
(
  id integer,
  name varchar,
)
WITH (
  connector='nats',
    server_url='0.0.0.0:4222',
    subject='event1',
    scan.startup.mode='earliest',
) FORMAT PLAIN ENCODE JSON;

// by timestamp millis policy, need to set scan.startup.mode='timestamp_millis', and also set nats.scan.startup.timestamp_millis

CREATE TABLE test_timestamp
(
  id integer,
  name varchar,
)
WITH (
  connector='nats',
    server_url='0.0.0.0:4222',
    subject='event1',
    scan.startup.mode='timestamp_millis',
    scan.startup.timestamp_millis='1694157849359',
) FORMAT PLAIN ENCODE JSON;

@yufansong yufansong requested a review from a team as a code owner September 8, 2023 07:45
@yufansong yufansong requested a review from tabVersion September 8, 2023 07:45
@hzxa21 hzxa21 changed the title feature(stream): support set nats consumer deliver policy as latest, earliest, by sequence, by timestamp feat(stream): support set nats consumer deliver policy as latest, earliest, by sequence, by timestamp Sep 8, 2023
@yufansong yufansong marked this pull request as draft September 8, 2023 07:52
@yufansong yufansong requested a review from StrikeW September 8, 2023 07:56
let consumer = properties
.common
.build_consumer(0, splits[0].start_sequence)
.build_consumer(0, start_position.clone())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is a bad practice to hardcode the split_id 0 everywhere in the code. Here you should get the split id from the input split. And I suggest that you can save the split id to the SplitReader when you create the reader. Then you can use the split_id to build a SourceMessage instead of hardcoding it in the from_nats_jetstream_message.
Btw you may impl From<NatsMessage> for SourceMessage to do the conversion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for you remind. I already applied your suggestions. :)

Timestamp(i128),
None,
}

/// The states of a NATS split, which will be persisted to checkpoint.
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
pub struct NatsSplit {
pub(crate) subject: String,
// TODO: to simplify the logic, return 1 split for first version. May use parallelism in
// future.
pub(crate) split_num: i32,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may refactor to split_id

@@ -41,7 +41,9 @@ chrono = { version = "0.4", default-features = false, features = [
"clock",
"std",
] }
clickhouse = { git = "https://github.com/risingwavelabs/clickhouse.rs", rev = "622501c1c98c80baaf578c716d6903dde947804e", features = ["time"] }
clickhouse = { git = "https://github.com/risingwavelabs/clickhouse.rs", rev = "622501c1c98c80baaf578c716d6903dde947804e", features = [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems unintended modifications to this file.

Comment on lines +416 to +417
DeliverPolicy::ByStartSequence {
start_sequence: 1 + parsed,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the behavior is not consistent with other connectors, starts with seq should contain seqth message

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one actually try to fix the problem that when resume.

If we use by sequence policy, this one may cause problem (we need to manuelly minus 1 for input sequence number). But currently I apply your suggestion and remove the by sequence policy. Then this would be fine.

@yufansong yufansong requested a review from wenym1 September 12, 2023 07:34
@yufansong yufansong changed the base branch from main to yufan/nats-connect September 13, 2023 07:16
@yufansong yufansong marked this pull request as ready for review September 13, 2023 07:17
@tabVersion
Copy link
Contributor

it is kinda easy to misuse seq num, let remove by sequence support

relate to #12241

@codecov
Copy link

codecov bot commented Sep 14, 2023

Codecov Report

Merging #12176 (873c17b) into yufan/nats-connect (8bd524b) will decrease coverage by 0.02%.
Report is 25 commits behind head on yufan/nats-connect.
The diff coverage is 0.00%.

@@                  Coverage Diff                   @@
##           yufan/nats-connect   #12176      +/-   ##
======================================================
- Coverage               69.75%   69.74%   -0.02%     
======================================================
  Files                    1407     1407              
  Lines                  235565   235596      +31     
======================================================
- Hits                   164314   164309       -5     
- Misses                  71251    71287      +36     
Flag Coverage Δ
rust 69.74% <0.00%> (-0.02%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Changed Coverage Δ
src/connector/src/common.rs 2.63% <0.00%> (-0.08%) ⬇️
src/connector/src/source/nats/enumerator/mod.rs 0.00% <0.00%> (ø)
src/connector/src/source/nats/mod.rs 0.00% <ø> (ø)
src/connector/src/source/nats/source/message.rs 0.00% <0.00%> (ø)
src/connector/src/source/nats/source/reader.rs 0.00% <0.00%> (ø)
src/connector/src/source/nats/split.rs 0.00% <0.00%> (ø)

... and 2 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Copy link
Contributor

@tabVersion tabVersion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge into #12227

@yufansong yufansong merged commit ddbf1f3 into yufan/nats-connect Sep 14, 2023
@yufansong yufansong deleted the yufan/nats-mode branch September 14, 2023 05:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants